Skip to content

Join room and start pipeline#2

Open
TomFanella4 wants to merge 5 commits intomainfrom
join-room-and-start-pipeline
Open

Join room and start pipeline#2
TomFanella4 wants to merge 5 commits intomainfrom
join-room-and-start-pipeline

Conversation

@TomFanella4
Copy link
Copy Markdown

@TomFanella4 TomFanella4 commented Apr 22, 2026


Open in Devin Review

Summary by CodeRabbit

  • Refactor

    • Refactored initialization workflow to run room setup and pipeline startup concurrently, reducing overall startup time
    • Improved internal callback management for track publication events, enhancing system reliability during concurrent operations
  • Chores

    • Updated golang.org/x/sync to be tracked as a direct dependency requirement

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

Walkthrough

The changes update Go dependencies, refactor the publisher's startup method to use error groups for concurrent room setup and pipeline initialization, and convert the track's end-of-stream callback to use atomic pointers for thread-safe concurrent access.

Changes

Cohort / File(s) Summary
Dependency Update
go.mod
Upgrades golang.org/x/sync from an indirect to direct dependency (v0.17.0), enabling support for error groups.
Concurrent Publisher Startup
publish.go
Refactors Start() method to execute room setup/track publication and pipeline initialization concurrently using errgroup.Group, replacing sequential execution with synchronized goroutines.
Atomic Callback Handling
track.go
Converts publisherTrack.onEOS from a direct function field to an atomic.Pointer[func()], enabling thread-safe concurrent callback updates and invocations.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 Hops through concurrency with atomic grace,
Goroutines dance at a steadier pace,
The publisher now runs both fast and true,
Error groups sync what was once askew! 🚀

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Join room and start pipeline' directly matches the main changes in the PR, which refactors publish.go to use errgroup for concurrent room joining and pipeline startup.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch join-room-and-start-pipeline

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@murilo-teleo murilo-teleo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea on what's going on with the onEOS changes, but is sounds right. I don't know about go details to catch the nuances on this

Base automatically changed from handle-pli to main April 27, 2026 16:58
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment thread publish.go
Comment on lines +80 to +120
g.Go(func() error {
cb := lksdk.NewRoomCallback()
cb.OnDisconnected = func() {
// TODO: stop publishing and exit
}
p.videoTrack.publication = pub
p.videoTrack.onEOS = func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
p.room = lksdk.NewRoom(cb)
if err := p.room.JoinWithToken(p.params.URL, p.params.Token,
lksdk.WithAutoSubscribe(false),
); err != nil {
return err
}
}

if p.audioTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_MICROPHONE,
})
if err != nil {
return err
if p.videoTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_CAMERA,
})
if err != nil {
return err
}
p.videoTrack.publication = pub
onEOS := func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
}
p.videoTrack.onEOS.Store(&onEOS)
}
p.audioTrack.publication = pub
p.audioTrack.onEOS = func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())

if p.audioTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_MICROPHONE,
})
if err != nil {
return err
}
p.audioTrack.publication = pub
onEOS := func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
}
p.audioTrack.onEOS.Store(&onEOS)
}
}
return nil
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Race: pipeline EOS can fire before onEOS callback is registered, causing per-track unpublish to be skipped

The parallelization of room connection and pipeline startup introduces a race where handleEOS (track.go:95-99) can fire from a GStreamer streaming thread before the room goroutine stores the onEOS callback via atomic.Pointer.Store (publish.go:103 / publish.go:117). In the old sequential code, onEOS was always set before pipeline.Start(), so this race didn't exist.

When handleEOS fires with onEOS still nil, isEnded is set to true but the per-track UnpublishTrack callback never executes. If the room goroutine later publishes the track and stores onEOS, that callback will never fire because GStreamer won't deliver EOS again. In a multi-track pipeline where one stream ends before the other (e.g., video ends but audio continues), the ended track remains published (producing no data) instead of being explicitly unpublished. Room-level cleanup in Stop() eventually handles it when the entire pipeline ends, but this is a behavioral regression from the sequential code.

Prompt for agents
The race exists because the pipeline starts producing data (and can reach EOS) concurrently with the room goroutine that registers the onEOS callback. One approach to fix this is to use errgroup.WithContext to get a context, but that alone won't solve the logical ordering issue.

A cleaner fix would be to keep the parallel startup but defer publishing/onEOS registration until after both goroutines succeed. For example:
1. Run room.JoinWithToken and pipeline.Start in parallel via errgroup
2. After g.Wait succeeds, publish the tracks and register onEOS callbacks sequentially

This preserves the parallelism benefit (room connect and pipeline startup overlap) while ensuring tracks are only published after the pipeline is confirmed running, and onEOS is set before any EOS can arrive from GStreamer (since the pipeline is in PLAYING state but any queued EOS messages would be processed on the main loop which hasn't started yet at that point).

Alternatively, if EOS can fire from streaming threads even after Start returns, you could have handleEOS check isEnded and then have the room goroutine check isEnded after storing onEOS, firing it manually if EOS already happened. This would close the race window.

Relevant code paths:
- publish.go: Start() method, the two g.Go closures
- track.go: handleEOS method, onEOS atomic pointer
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
publish.go (1)

142-156: ⚠️ Potential issue | 🟠 Major

Stop() is not safe against concurrent invocation.

With this PR, Stop() can now be called from at least three sites: the errgroup error path (line 127), the signal goroutine (line 135), and messageWatch on EOS/error (lines 163, 169). The current implementation reads/writes p.pipeline, p.room, p.loop without any guard or sync.Once, so two concurrent callers can race (e.g., one observes p.pipeline != nil while another nils it, leading to a nil deref on BlockSetState, or Disconnect being called twice).

Consider wrapping the body in a sync.Once:

🛡️ Proposed fix
 type Publisher struct {
 	params     PublisherParams
 	pipeline   *gst.Pipeline
 	loop       *glib.MainLoop
 	videoTrack *publisherTrack
 	audioTrack *publisherTrack
 	room       *lksdk.Room
+	stopOnce   sync.Once
 }
 ...
 func (p *Publisher) Stop() {
-	logger.Infow("stopping publisher..")
-	if p.pipeline != nil {
-		p.pipeline.BlockSetState(gst.StateNull)
-		p.pipeline = nil
-	}
-	if p.room != nil {
-		p.room.Disconnect()
-		p.room = nil
-	}
-	if p.loop != nil {
-		p.loop.Quit()
-		p.loop = nil
-	}
+	p.stopOnce.Do(func() {
+		logger.Infow("stopping publisher..")
+		if p.pipeline != nil {
+			p.pipeline.BlockSetState(gst.StateNull)
+			p.pipeline = nil
+		}
+		if p.room != nil {
+			p.room.Disconnect()
+			p.room = nil
+		}
+		if p.loop != nil {
+			p.loop.Quit()
+			p.loop = nil
+		}
+	})
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@publish.go` around lines 142 - 156, Stop() is not concurrency-safe: multiple
goroutines may concurrently read/write Publisher.pipeline, Publisher.room and
Publisher.loop causing nil-derefs or double-close; modify the Publisher type to
include a sync.Once (or a dedicated mutex + closed flag) and wrap the entire
body of Stop() so its teardown logic (references to p.pipeline.BlockSetState,
p.room.Disconnect, p.loop.Quit and subsequent nil assignments) runs exactly
once; ensure any callers that previously relied on idempotent behavior still
work by making Stop() safe to call repeatedly and document/maintain idempotence
for Stop().
🧹 Nitpick comments (2)
publish.go (1)

82-84: TODO: OnDisconnected is a no-op.

If the SDK signals disconnect (e.g., server-initiated kick, network drop after retries), the publisher will keep the pipeline running and silently produce into a dead room. At minimum this should call p.Stop() so the process exits cleanly.

 		cb.OnDisconnected = func() {
-			// TODO: stop publishing and exit
+			logger.Infow("room disconnected, stopping publisher")
+			p.Stop()
 		}

Want me to wire this up in a follow-up commit / open a tracking issue?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@publish.go` around lines 82 - 84, The cb.OnDisconnected handler is currently
a no-op and must stop the publisher so the process exits cleanly when the SDK
signals a disconnect; update the cb.OnDisconnected assignment to call the
publisher's Stop method (p.Stop()) and perform any necessary cleanup (e.g.,
cancel contexts, close streams) inside that handler so the pipeline doesn't keep
producing into a dead room—locate the cb.OnDisconnected closure in publish.go
and invoke p.Stop() (and related shutdown helpers already present) from there.
go.mod (1)

82-82: LGTM — direct dependency promotion is correct.

golang.org/x/sync/errgroup is now imported directly by publish.go, so dropping the // indirect marker is appropriate. Note that the require block at lines 23–91 mixes direct and indirect deps; consider running go mod tidy so the formatter splits them into separate require blocks for clarity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@go.mod` at line 82, The go.mod currently promotes golang.org/x/sync to a
direct dependency because publish.go now imports golang.org/x/sync/errgroup; run
`go mod tidy` to recompute and clean up module requirements so direct and
indirect dependencies are separated and formatted into appropriate require
blocks, which will also remove stale `// indirect` markers and tidy the require
block mixing lines 23–91; verify publish.go's import of errgroup is present and
then commit the updated go.mod and go.sum.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@publish.go`:
- Around line 100-103: The onEOS closures assigned via p.videoTrack.onEOS.Store
and p.audioTrack.onEOS.Store capture p.room which Stop() can nil, risking a
nil-pointer race; fix by capturing the LocalParticipant (e.g., lp :=
p.room.LocalParticipant) into a local variable before creating the closure and
use lp.UnpublishTrack(pub.SID()) inside the closure (or check for nil lp inside
the closure) so the callback is self-contained and safe against concurrent
Stop() niling p.room; apply the same change to both the video and audio onEOS
closures and ensure handleEOS in track.go/messageWatch interactions remain safe.
- Around line 80-124: The pipeline is started concurrently with
JoinWithToken/PublishTrack causing early samples to be emitted before tracks are
published, a race on the onEOS closures accessing p.room after p.Stop clears it,
and lack of cancellation propagation; fix by sequencing Start after successful
room join and track publications (move the call to p.pipeline.Start() to run
only after JoinWithToken and both PublishTrack calls complete), change how you
store onEOS closures so they capture a safe reference (e.g., capture pub.SID()
and call Unpublish via a safe helper that checks p.room != nil) instead of
directly dereferencing p.room.LocalParticipant, and if you retain parallelism
replace g := errgroup.Group with errgroup.WithContext to propagate cancellation
from JoinWithToken failures to the pipeline start/operation.

---

Outside diff comments:
In `@publish.go`:
- Around line 142-156: Stop() is not concurrency-safe: multiple goroutines may
concurrently read/write Publisher.pipeline, Publisher.room and Publisher.loop
causing nil-derefs or double-close; modify the Publisher type to include a
sync.Once (or a dedicated mutex + closed flag) and wrap the entire body of
Stop() so its teardown logic (references to p.pipeline.BlockSetState,
p.room.Disconnect, p.loop.Quit and subsequent nil assignments) runs exactly
once; ensure any callers that previously relied on idempotent behavior still
work by making Stop() safe to call repeatedly and document/maintain idempotence
for Stop().

---

Nitpick comments:
In `@go.mod`:
- Line 82: The go.mod currently promotes golang.org/x/sync to a direct
dependency because publish.go now imports golang.org/x/sync/errgroup; run `go
mod tidy` to recompute and clean up module requirements so direct and indirect
dependencies are separated and formatted into appropriate require blocks, which
will also remove stale `// indirect` markers and tidy the require block mixing
lines 23–91; verify publish.go's import of errgroup is present and then commit
the updated go.mod and go.sum.

In `@publish.go`:
- Around line 82-84: The cb.OnDisconnected handler is currently a no-op and must
stop the publisher so the process exits cleanly when the SDK signals a
disconnect; update the cb.OnDisconnected assignment to call the publisher's Stop
method (p.Stop()) and perform any necessary cleanup (e.g., cancel contexts,
close streams) inside that handler so the pipeline doesn't keep producing into a
dead room—locate the cb.OnDisconnected closure in publish.go and invoke p.Stop()
(and related shutdown helpers already present) from there.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 82ed8951-63e6-4d3b-8d85-bd72f85d434d

📥 Commits

Reviewing files that changed from the base of the PR and between e5493a8 and 5d9ff16.

📒 Files selected for processing (3)
  • go.mod
  • publish.go
  • track.go

Comment thread publish.go
Comment on lines +80 to +124
g.Go(func() error {
cb := lksdk.NewRoomCallback()
cb.OnDisconnected = func() {
// TODO: stop publishing and exit
}
p.videoTrack.publication = pub
p.videoTrack.onEOS = func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
p.room = lksdk.NewRoom(cb)
if err := p.room.JoinWithToken(p.params.URL, p.params.Token,
lksdk.WithAutoSubscribe(false),
); err != nil {
return err
}
}

if p.audioTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_MICROPHONE,
})
if err != nil {
return err
if p.videoTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_CAMERA,
})
if err != nil {
return err
}
p.videoTrack.publication = pub
onEOS := func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
}
p.videoTrack.onEOS.Store(&onEOS)
}
p.audioTrack.publication = pub
p.audioTrack.onEOS = func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())

if p.audioTrack != nil {
pub, err := p.room.LocalParticipant.PublishTrack(p.audioTrack.track, &lksdk.TrackPublicationOptions{
Source: livekit.TrackSource_MICROPHONE,
})
if err != nil {
return err
}
p.audioTrack.publication = pub
onEOS := func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
}
p.audioTrack.onEOS.Store(&onEOS)
}
}
return nil
})

if err := p.pipeline.Start(); err != nil {
g.Go(func() error {
return p.pipeline.Start()
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

golang.org/x/sync errgroup error cancellation behavior without context

💡 Result:

In golang.org/x/sync/errgroup, a plain errgroup.Group (created as a zero value, without calling WithContext) has no context cancellation capability. When using g.Go(f), all submitted functions run to completion in their goroutines regardless of errors returned by other functions. The first non-nil error is captured and returned by g.Wait after all goroutines finish, but no cancellation occurs and remaining goroutines continue executing without interruption. This differs from errgroup.WithContext(ctx), where the group has an associated context that gets canceled (via the internal cancel func) when the first error is encountered. Goroutines can then check ctx.Err or use context-aware operations to exit early. Key evidence from source code: - Group struct has cancel func(error), which is nil in plain Group. - In Go/TryGo: if err != nil, g.errOnce.Do sets g.err and calls g.cancel(g.err) only if g.cancel != nil. - Wait calls g.wg.Wait (blocks until all done), then if g.cancel != nil calls g.cancel(g.err), then returns g.err. Plain Group behaves like sync.WaitGroup + error collection: always waits for all, no cancellation on error. Use WithContext explicitly for cancellation behavior.

Citations:


🏁 Script executed:

#!/bin/bash
# First, locate the publish.go file
find . -name "publish.go" -type f | head -5

Repository: MainSquare/gstreamer-publisher

Length of output: 87


🏁 Script executed:

#!/bin/bash
# Read the relevant section of publish.go
sed -n '80,124p' ./publish.go

Repository: MainSquare/gstreamer-publisher

Length of output: 1187


🏁 Script executed:

#!/bin/bash
# Get more context around the lines - see the full function/context
sed -n '60,140p' ./publish.go

Repository: MainSquare/gstreamer-publisher

Length of output: 1750


🏁 Script executed:

#!/bin/bash
# Search for the Stop() method
grep -n "func.*Stop" ./publish.go | head -10

Repository: MainSquare/gstreamer-publisher

Length of output: 107


🏁 Script executed:

#!/bin/bash
# Read the Stop() method
sed -n '142,200p' ./publish.go

Repository: MainSquare/gstreamer-publisher

Length of output: 1408


🏁 Script executed:

#!/bin/bash
# Search for where onEOS is used/called
rg "onEOS" ./publish.go -A 2 -B 2

Repository: MainSquare/gstreamer-publisher

Length of output: 417


🏁 Script executed:

#!/bin/bash
# Search for where onEOS callbacks are loaded and called
rg "onEOS.*Load" ./publish.go -A 3 -B 1

Repository: MainSquare/gstreamer-publisher

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Search more broadly for Load calls on track fields
rg "\.Load\(" ./publish.go -B 3 -A 2

Repository: MainSquare/gstreamer-publisher

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Search for Track type definition
rg "type.*Track" ./publish.go -A 5 | head -40

Repository: MainSquare/gstreamer-publisher

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Find all Go files in the repository
find . -name "*.go" -type f

Repository: MainSquare/gstreamer-publisher

Length of output: 108


🏁 Script executed:

#!/bin/bash
# Read track.go to understand track structure and onEOS
cat ./track.go

Repository: MainSquare/gstreamer-publisher

Length of output: 4466


Reconsider concurrent ordering of pipeline start vs. room join/publish.

p.pipeline.Start() is launched in parallel with JoinWithToken + PublishTrack. Three issues:

  1. pipeline.Start() returns quickly after issuing an async state change to PLAYING, so the concurrency buys very little — the room-join goroutine is the only one that actually takes time. Meanwhile the pipeline begins producing samples forwarded to t.track.WriteSample() before the track is published, so initial encoded frames (including any keyframe at the head of the stream) are emitted into a track with no subscribers yet. For video this can extend time-to-first-frame for early subscribers until the next keyframe interval.

  2. Race condition in onEOS callback: The closure stored via p.videoTrack.onEOS.Store() and p.audioTrack.onEOS.Store() accesses p.room.LocalParticipant when invoked later (in handleEOS). Concurrently, p.Stop() sets p.room = nil from the signal handler, message watch, or error path. If handleEOS executes after Stop() has cleared p.room, a nil pointer dereference occurs.

  3. errgroup.Group without context does not propagate cancellation — if JoinWithToken fails, p.pipeline.Start() continues transitioning to PLAYING until g.Wait() returns and p.Stop() runs, leaving a window where the pipeline produces into discarded state.

Consider starting the pipeline only after the room is joined and tracks are published, and use errgroup.WithContext if you keep the parallel structure for future extension.

♻️ Sketch of a sequential ordering
-	var g errgroup.Group
-
-	g.Go(func() error {
-		cb := lksdk.NewRoomCallback()
-		...
-	})
-
-	g.Go(func() error {
-		return p.pipeline.Start()
-	})
-
-	if err := g.Wait(); err != nil {
-		p.Stop()
-		return err
-	}
+	if err := p.joinAndPublish(); err != nil {
+		p.Stop()
+		return err
+	}
+	if err := p.pipeline.Start(); err != nil {
+		p.Stop()
+		return err
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@publish.go` around lines 80 - 124, The pipeline is started concurrently with
JoinWithToken/PublishTrack causing early samples to be emitted before tracks are
published, a race on the onEOS closures accessing p.room after p.Stop clears it,
and lack of cancellation propagation; fix by sequencing Start after successful
room join and track publications (move the call to p.pipeline.Start() to run
only after JoinWithToken and both PublishTrack calls complete), change how you
store onEOS closures so they capture a safe reference (e.g., capture pub.SID()
and call Unpublish via a safe helper that checks p.room != nil) instead of
directly dereferencing p.room.LocalParticipant, and if you retain parallelism
replace g := errgroup.Group with errgroup.WithContext to propagate cancellation
from JoinWithToken failures to the pipeline start/operation.

Comment thread publish.go
Comment on lines +100 to +103
onEOS := func() {
_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
}
p.videoTrack.onEOS.Store(&onEOS)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Possible nil-pointer race: onEOS closure dereferences p.room which Stop() nils.

The onEOS closure calls p.room.LocalParticipant.UnpublishTrack(pub.SID()), but Stop() (lines 148–151) sets p.room = nil after disconnecting. EOS can be triggered from the GStreamer thread (handleEOS in track.go) concurrently with Stop() being invoked from messageWatch (on MessageEOS/MessageError) or the signal goroutine, leaving a window where the closure dereferences a nil p.room.

Capture p.room (or just p.room.LocalParticipant) in the closure to make the callback self-contained, or guard the access:

🛡️ Proposed fix
-		if p.videoTrack != nil {
-			pub, err := p.room.LocalParticipant.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
+		if p.videoTrack != nil {
+			lp := p.room.LocalParticipant
+			pub, err := lp.PublishTrack(p.videoTrack.track, &lksdk.TrackPublicationOptions{
 				Source: livekit.TrackSource_CAMERA,
 			})
 			if err != nil {
 				return err
 			}
 			p.videoTrack.publication = pub
 			onEOS := func() {
-				_ = p.room.LocalParticipant.UnpublishTrack(pub.SID())
+				_ = lp.UnpublishTrack(pub.SID())
 			}
 			p.videoTrack.onEOS.Store(&onEOS)
 		}

(apply the same pattern to the audio block at lines 106–118.)

Also applies to: 114-117

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@publish.go` around lines 100 - 103, The onEOS closures assigned via
p.videoTrack.onEOS.Store and p.audioTrack.onEOS.Store capture p.room which
Stop() can nil, risking a nil-pointer race; fix by capturing the
LocalParticipant (e.g., lp := p.room.LocalParticipant) into a local variable
before creating the closure and use lp.UnpublishTrack(pub.SID()) inside the
closure (or check for nil lp inside the closure) so the callback is
self-contained and safe against concurrent Stop() niling p.room; apply the same
change to both the video and audio onEOS closures and ensure handleEOS in
track.go/messageWatch interactions remain safe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants